package cron

import (
	"context"
	"errors"
	"gitee.com/zackeus/go-boot/cron/consistenthash"
	"gitee.com/zackeus/go-boot/cron/core"
	"gitee.com/zackeus/go-boot/cron/drive"
	"gitee.com/zackeus/go-zero/core/logx"
	"sync"
	"sync/atomic"
	"time"
)

const (
	defaultReplicas = 50

	cronRunning = 1
	cronStopped = 0
)

var (
	ErrJobExist        = errors.New("jobName already exist")
	ErrJobNotExist     = errors.New("jobName not exist")
	ErrJobWrongNode    = errors.New("job is not running in this node")
	ErrCronIsNotSteady = errors.New("nodePool is not steady")
	ErrNodePoolIsNil   = errors.New("nodePool is nil")
)

type (
	// Cron is main struct
	Cron struct {
		mu       sync.RWMutex
		ctx      context.Context
		logger   logx.Logger
		stopFunc func()

		driver         drive.Driver
		events         chan *drive.Event
		hashRing       *consistenthash.Map
		runningLocally bool
		hashReplicas   int

		cr        *core.Cron
		crOptions []core.Option

		jobs    map[string]*JobWrapper
		running int32
	}
)

// New create a Cron with Cron Option
func New(opts ...Option) (*Cron, error) {
	/* 构建上下文 */
	ctx, stop := context.WithCancel(context.Background())

	c := &Cron{
		ctx:          ctx,
		logger:       logx.WithContext(ctx),
		stopFunc:     stop,
		jobs:         make(map[string]*JobWrapper),
		crOptions:    make([]core.Option, 0),
		hashReplicas: defaultReplicas,
	}
	for _, opt := range opts {
		opt(c)
	}
	if c.driver == nil {
		c.runningLocally = true
	}
	if !c.runningLocally {
		c.hashRing = consistenthash.New(c.hashReplicas, nil)
	}

	cr, err := core.New(c.ctx, c.crOptions...)
	if err != nil {
		stop()
		return nil, err
	}
	c.cr = cr
	return c, nil
}

// GetJob 获取指定job
// thisNodeOnly: 只在此节点运行
func (d *Cron) GetJob(jobName string, thisNodeOnly bool) (*JobWrapper, error) {
	d.mu.RLock()
	defer d.mu.RUnlock()

	job, ok := d.jobs[jobName]
	if !ok {
		/* 任务不存在 */
		return nil, ErrJobNotExist
	}
	if !thisNodeOnly || d.runningLocally {
		return job, nil
	}
	isRunningHere, err := d.CheckJobAvailable(jobName)
	if err != nil {
		return nil, err
	}
	if !isRunningHere {
		/* 非此节点运行的job */
		return nil, ErrJobWrongNode
	}
	return job, nil
}

// GetJobs job列表
func (d *Cron) GetJobs(thisNodeOnly bool) []*JobWrapper {
	d.mu.RLock()
	defer d.mu.RUnlock()

	ret := make([]*JobWrapper, 0)
	for _, v := range d.jobs {
		if thisNodeOnly && !d.runningLocally {
			isRunningHere, err := d.CheckJobAvailable(v.Name)
			if err != nil {
				continue
			}

			if isRunningHere {
				/* 此节点运行 */
				ret = append(ret, v)
			}
		} else {
			ret = append(ret, v)
		}
	}
	return ret
}

// AddJob 添加job
func (d *Cron) AddJob(jobName, cronStr string, job core.Job) (err error) {
	return d.addJob(jobName, cronStr, job)
}

// AddFunc 添加执行函数
func (d *Cron) AddFunc(jobName, cronStr string, cmd func()) (err error) {
	return d.addJob(jobName, cronStr, core.FuncJob(cmd))
}

// Remove Job by jobName
func (d *Cron) Remove(jobName string) {
	d.mu.Lock()
	defer d.mu.Unlock()

	if job, ok := d.jobs[jobName]; ok {
		delete(d.jobs, jobName)
		d.cr.Remove(job.ID)
	}
}

// Start job
func (d *Cron) Start() error {
	if d.ctx == nil || d.ctx.Err() != nil {
		/* 已关闭 */
		return errors.New("cron context has closed")
	}

	if atomic.CompareAndSwapInt32(&d.running, cronStopped, cronRunning) {
		if !d.runningLocally {
			/* 启动驱动 */
			err := d.driver.Start(d.ctx, d.events)
			if err != nil {
				atomic.StoreInt32(&d.running, cronStopped)
				return err
			}

			/* 获取集群节点 */
			nodes, err := d.driver.GetNodes()
			if err != nil {
				atomic.StoreInt32(&d.running, cronStopped)
				return err
			}
			for _, v := range nodes {
				d.hashRing.Add(v)
			}

			d.receiveLoop()
		}

		d.cr.Start()
		d.logger.Debugf("cron started, nodeID is %s", d.GetNodeID())
	}
	return nil
}

// Stop 调度停止
func (d *Cron) Stop(t ...time.Duration) {
	d.cr.Stop(t...)

	if !d.runningLocally {
		if d.driver != nil {
			_ = d.driver.Stop()
		}
	}

	d.stopFunc()
	if d.events != nil {
		close(d.events)
	}
}

func (d *Cron) addJob(jobName, cronStr string, job core.Job) (err error) {
	d.logger.Debugf("cron addJob '%s' : %s", jobName, cronStr)
	d.mu.Lock()
	defer d.mu.Unlock()

	if _, ok := d.jobs[jobName]; ok {
		/* 重复job */
		return ErrJobExist
	}
	innerJob := &JobWrapper{Name: jobName, CronStr: cronStr, Job: job, Cron: d}
	entryID, err := d.cr.AddJob(cronStr, innerJob)
	if err != nil {
		return err
	}
	innerJob.ID = entryID
	d.jobs[jobName] = innerJob
	return nil
}

// 判断job是否可以在当前节点运行
func (d *Cron) allowThisNodeRun(jobName string) (ok bool) {
	if d.runningLocally {
		/* 单机运行 直接返回true */
		return true
	}
	ok, err := d.CheckJobAvailable(jobName)

	if err != nil {
		/* job运行判断失败 */
		d.logger.Errorf("allow this node run error, err=%v, job=%v", err, jobName)
		// TODO 实现执行失败job兜底机制
		//if d.recentJobs != nil {
		//	_ = d.recentJobs.AddJob(jobName, time.Now())
		//}
	}
	return
}
